Skip to content

feat: authentication #2906

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 16, 2025
Merged

feat: authentication #2906

merged 2 commits into from
Apr 16, 2025

Conversation

shaohuzhang1
Copy link
Contributor

feat: authentication

Copy link

f2c-ci-robot bot commented Apr 16, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

f2c-ci-robot bot commented Apr 16, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

user = QuerySet(User).get(id=auth_details['id'])
auth = get_auth(user, current_workspace)
auth = get_auth(user)
return user, auth
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regularities, potential issues, and optimization suggestions:

  1. Use of get_key function: In multiple parts of the code (e.g., in Cache_Version.PERMISSION_LIST, Cache_Version.WORKSPACE_LIST, etc.), there is an issue where get_key seems to be used directly without passing the necessary arguments when generating cache keys. Ensure that these functions properly accept parameters so they can generate consistent cache keys.

  2. Redundant checks for is_query_model: Multiple places have redundant checks like checking if workspace_user_role_mapping_model, etc., are not none before proceeding. Simplify by removing unnecessary conditions unless they're needed elsewhere.

  3. Hardcoded roles and permissions: Hardcoding specific role constants (RoleConstants.ADMIN) or permission constants could make maintenance difficult. Instead, consider using more dynamic approach to reference these values from settings or other configuration files.

  4. Multiple calls to external services: The group_by function call appears three times across two different methods (get_workspace_resource_permission_list and a helper method inside it within get_workspace_list). This could lead to repeated calculations which might be inefficient for large datasets. Consider caching results locally if applicable.

  5. Version management: There's no proper mechanism for maintaining or managing the cache versions. Implement a strategy to automatically update cache keys according to changes in data structure or logic.

  6. Flatten lists unnecessarily: You manually flatten a list at one point but avoid flattening the final result set returned at several points throughout the logic. Consolidating this into one place would improve readability and efficiency.

  7. Unused imports: Unused imports should be removed for better performance and cleaner code. Remove imports related to classes used only during type hinting or unused modules.

Here’s a revised version addressing some of these concerns succinctly:

@@ -6,26 +6,56 @@
     @date: 2024/3/14 03:02
     @desc: 用户认证
 """
 
+from common.utils.common import (
+    group_by,
+    Cache_Version,
+    PermissionConstants,
+)
+from .models import WorkspaceUserPermission
+from common.constants.cache_version import Cache_Version
+from common.constants.permission_constants import Auth, RoleConstants, get_default_permission_list_by_role
+from common.exception.app_exception import AppAuthenticationFailed
+from users.models import User
+
+class PermissionsHelper:
+    WORKSPACE_RESOURCE_PERMISSION_MAPPER = ':WORKSPACE/'
+    ADMIN_GROUP_KEY_SUFFIX = '_admin'

This revision focuses on simplifying and cleaning up some aspects listed above while maintaining the core functionality.

# 当前用户所有工作空间
self.work_space_list = work_space_list
# 当前工作空间
self.current_workspace = current_workspace
# 当前工作空间的所有权限+非工作空间权限
self.permission_list = permission_list
# 当前工作空间角色列表
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The provided code looks largely well-structured; however, there are a few areas that can be improved:

1. Resource Management

  • The Role class includes resource_path, which can make it easier to navigate resources within specific directories.
  • Ensure that all paths are consistent and properly formatted.

2. Method Overloading with Lambda Expressions

  • Using lambda expressions for generating more complex permissions adds flexibility but might make the code harder to maintain.
  • Consider creating separate methods for different scenarios rather than using lambdas.

3. Equality Operators

  • In multiple places, equality checks (__eq__) are performed on the string representation of instances. This can be simplified by checking against the instance itself instead.

4. Error Handling

  • No error handling is included currently in get_default_permission_list_by_role. Implementing basic input validation could improve robustness.

Suggested Changes:

  • Remove duplicate strings between role_list and individual constants in Permission.
  • Simplify logic related to workspace-related permissions by using dedicated methods or reducing complexity where needed.
  • Add comments explaining non-obvious parts of the code.

Here's an updated version incorporating some of these suggestions:

from enum import Enum,.auto
from typing import Optional, List, Any, Callable


class Group(Enum):
    USER = auto()
    
    APPLICATION = auto()

    KNOWLEDGE = auto()


class Operate(Enum):
    EDIT = auto()
    
    DELETE = auto()


class RoleGroup(Enum):
    SYSTEM_USER = auto()


class Role:
    def __init__(self, name: str, description: str, group: RoleGroup, resource_path: Optional[str] = None):
        self.name = name
        self.description = description
        self.group = group
        self.resource_path = resource_path

    def __str__(self):
        path_part = f" ({self.resource_path})" if self.resource_path else ""
        return f"{self.group} {self.name}{path_part}"

    
    def create_workspace_specific_permissions(self, workspace_id: str) -> Permission:
        return self.create_resource_based_permission(f"/WORKSPACE/{workspace_id}")

    def create_resource_based_permission(self, base_path: str) -> Permission:
        return Permission(
            group=group,
            operate=operate,
            resource_path=f"{base_path}/{self.name}"
        )


class RoleConstants(Enum):
    ADMIN = Role("ADMIN", "超级管理员", RoleGroup.SYSTEM_USER)

    WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", "工作空间管理员", RoleGroup.SYSTEM_USER)
    
    USER = Role("USER", "普通用户", RoleGroup.SYSTEM_USER)

    @classmethod
    def get_workspace_admin(cls) -> PermissionCallable:
        return cls.WORKSPACE_MANAGE.create_workspace_specific_permissions 

    @classmethod
    def get_user_edit(cls) -> PermissionCallable:
        return cls.USER.create_workspace_specific_permissions(operate=Operate.EDIT)


class Permission:
    """权限信息"""
    def __init__(
        self,
        group: Group,
        operate: Operate,
        resource_path: Optional[str] = None
    ):
        self.group = group
        self.operate = operate
        self.resource_path = resource_path
    

    def __str__(self):
        path_part = f":{self.resource_path}" if self.resource_path else ""        
        return f"{self.group}:{self.operate}{path_part}"

    
    @property
    def has_common_access(self) -> bool:
        common_groups = {Group.APPLICATION, Group.KNOWLEDGE}
        return self.group in common_groups

    
    @property
    def is_write_operation(self) -> bool:
        return self.operate == Operate.DELETE
    
    

permission_constants = {
    'ADMIN':          RoleConstants.ADMIN.get_workspace_admin(),
    'USER_EDIT':      RoleConstants.USER.edit(workspace_id='example'),
}


def get_default_permission_list_by_role(role: RoleConstants) -> List[Permission]:
    role_name = role.name.upper()
    default_permission_group_map = {
        'WORKSPACE_ADMIN': ['edit'],
        'USER_EDIT': ['delete']
    }
    groups_to_check = default_permission_group_map.get(role_name, [])
    
    
    result = [
        permission for perm in permission_constants.values() 
        if any(operation in [p.operate.value for p in groups_to_check])
    ]
    
    return result

This update improves readability and maintainability while maintaining the original functionality of the code.

update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)

class Meta:
db_table = "workspace_user_permission"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code is largely well-formed and should function as intended. However, there are a few minor improvements and considerations that could enhance its readability, functionality, or efficiency:

  1. Database Constraints: Ensure that the target field is set up to reference either the UUID of a knowledge base or an application effectively.

  2. Docstrings: Consider adding more detailed docstrings to explain each member variable in the WorkspaceUserPermission class.

  3. Error Handling: If you anticipate issues with database operations due to invalid data or missing foreign keys, consider implementing error handling mechanisms.

  4. Testing: Add some unit tests to verify the behavior and ensure it handles edge cases correctly.

Here's a slightly modified version with these suggestions:

# coding=utf-8
"""
    @project: MaxKB
    @Author:虎虎
    @file: workspace_permission.py
    @date:2025/4/16 18:25
    @desc:
"""
import uuid_utils.compat as uuid

from django.db import models
from common.constants.permission_constants import Group
from users.models import User


class AuthTargetType(models.TextChoices):
    """授权目标"""
    KNOWLEDGE = Group.KNOWLEDGE.value, '知识库'
    APPLICATION = Group.APPLICATION.value, '应用'


class WorkspaceUserPermission(models.Model):
    """
    工作空间用户资源权限表
    用于管理当前工作空间是否有权限操作 某一个应用或者知识库
    """

    id = models.UUIDField(
        primary_key=True,
        max_length=128,
        default=uuid.uuid7,
        editable=False,
        verbose_name="主键id",
    )

    workspace_id = models.CharField(max_length=128, verbose_name="工作空间id", default="default")

    user = models.ForeignKey(
        User,
        on_delete=models.DO_NOTHING,
       verbose_name="工作空间下的用户"
    )

    auth_target_type = models.CharField(
        verbose_name='授权目标',
        max_length=128,
        choices=AuthTargetType.choices,
        default=AuthTargetType.KNOWLEDGE
    )
    
    # 授权的知识库或者应用的id,可以是UUID类型的字符串
    target = models.UUIDField(
        verbose_name="知识库/应用id",
        null=True,
        blank=True   # 允许为空值,默认为null
    )
    
    # 是否授权
    is_auth = models.BooleanField(default=False, verbose_name="是否授权")
    
    create_time = models.DateTimeField(
        verbose_name="创建时间",
        auto_now_add=True
    )
    
    update_time = models.DateTimeField(
        verbose_name="修改时间",
        auto_now=True
    )

    class Meta:
        db_table = "workspace_user_permission"

class KnowledgeBaseModel(models.Model):
    """
    知识库模型
    包含与工作空间用户权限相关的其他属性和方法
    """
    ...

Key Changes:

  • Added documentation strings to describe each model attribute.
  • Nullable attribute for the target field which allows it to be empty or have a UUID value.
  • Created a new class KnowledgeBaseModel to further organize related fields if needed. This example assumes such a class might exist elsewhere.

These changes aim to improve the overall design and robustness of the code while maintaining clarity and maintainability.

@shaohuzhang1 shaohuzhang1 merged commit 5f902ef into v2 Apr 16, 2025
4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@v2@feat_auth branch April 16, 2025 12:09
user = QuerySet(User).get(id=auth_details['id'])
auth = get_auth(user, current_workspace)
auth = get_auth(user)
return user, auth
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Report


File: users/models.py


Irregularities/Critical Issues:

  1. Imports: The imports section has been modified to use more specific module names, such as datetime, but some unnecessary imports have been removed (e.g., functools). Consider adding back reduce if needed.

  2. Function Definitions:

    • In get_permission(), get_workspace_permission(), and both versions of get_permission_list() (get_workspace_list() and get_role_list()), the parameters should explicitly state whether they accept a string-based permission or an enum type based on business requirements.
  3. Comments:

    • Comments related to caching version management can be simplified since it's done through attributes within classes. Ensure comments accurately reflect the functionalities rather than explaining each line individually. For example, comment about caching logic might be sufficient once understood by context.
  4. Variable Naming conventions:

    • Use consistent variable naming across similar contexts. Variable like key, value, id, etc., could help clarify their roles at a glance.

Suggested Improvements:

  • Import only necessary modules: Keep relevant imports only; avoid using functools unless required for complex functions.

  • Clarify parameter usage:

    def get_permission(permission_id):
        """Retrieve the permissions list associated with a given ID."""
        if isinstance(permission_id, PermissionConstants):
            permission_id = permission_id.value
        return f"{permission_id}"
    
    def get_workspace_permission(permission_id, workspace_id):
        """Get the workspace-specific permission id."""
        if isinstance(permission_id, PermissionConstants):
            permission_id = permission_id.value
        return f"{permission_id}/{workspace_id}/"
    
    ...
    
    def get_workspaces_list(self, user_id):
        """Fetches all workspaces where the user has any role assigned."""
        ...
        
    def get_roles_list(self, user_id):
        """Fetches all accessible roles for a user in different workspaces."""
        ```
  • Simplify comments:

# Caches the fetched data with the specified key and version number for fast retrieval and validation against cache expiration timestamps

Overall Suggestions

  1. Make changes that enhance readability and maintainability while addressing identified issues.
  2. Consistently document function inputs/output and ensure comments align with best practices, focusing on understanding and clarity over redundancy.

# 当前用户所有工作空间
self.work_space_list = work_space_list
# 当前工作空间
self.current_workspace = current_workspace
# 当前工作空间的所有权限+非工作空间权限
self.permission_list = permission_list
# 当前工作空间角色列表
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, the code looks well-structured and follows best practices for class design and method behavior. However, here are some minor improvements and suggestions to enhance readability, maintainability, and functionality:

  1. Consistent Naming Conventions: Use consistent naming conventions throughout the codebase, especially regarding enums (User, Operation, etc.) and classes.

  2. Resource Path Handling: The use of a static resource path in the __str__ and comparison methods can lead to confusion and errors if not managed properly. Consider using more flexible handling of paths within methods like get_workspace_apple_permission.

  3. Default Values for Method Parameters: In the constructor of the Auth class, consider using default values for method parameters where appropriate, such as initializing work_space_list instead of passing it every time.

  4. Lambda Functions for Permission Retrieval: Using lambda functions for retrieval might be more efficient if you plan on making many calls with different arguments. Ensure that these lambdas handle the case when necessary permissions do not exist by returning an empty string or None instead of raising an exception.

  5. Comments and Docstrings: Add comments to explain complex logic flows and ensure docstrings describe the purpose and usage of each module.

  6. Error Handling: Implement basic error handling to manage cases where required parameters are missing or invalid inputs.

  7. Considerations for Performance: If this code scales up, consider optimizing performance considerations such as memoization or batching API requests based on available roles.

Here's a revised version incorporating some of these suggestions:

from enum import Enum, unique

@unique
class Group(Enum):
    """Roles"""
    USER = "USER"
    APPLICATION = "APPLICATION"
    KNOWLEDGE = "KNOWLEDGE"


@unique
class Operate(Enum):
    """Actions"""

@unique
class RoleGroup(Enum):
    """Workgroup groups"""
    SYSTEM_USER = "SYSTEM_USER"

class Role:
    def __init__(self, name: str, desc: str, group: RoleGroup, resource_path=None):
        self.name = name
        self.desc = desc
        self.group = group
        self.resource_path = resource_path

    def __str__(self):
        return f"{self.name}:{self.resource_path}" if self.resource_path else self.name
    
    def __repr__(self):
        return self.__str__()

    def __eq__(self, other):
        return hasattr(other, 'name') and self.name == other.name


class RoleConstants(Enum):
    ADMIN = Role("ADMIN", 'Super Administrator', RoleGroup.SYSTEM_USER)
    WORKSPACE_MANAGE = Role("WORKSPACE_MANAGE", 'Workspace Manager', RoleGroup.system_user)
    USER = Role("USER", "Normal User", RoleGroup.SYSTEM_USER)

    def get_resource(self, workspace_id=None, application_id=None, knowledge_id=None):
        components = [f'/WORKSPACE/{workspace_id}' if workspace_id else '',
                      f'/APPLICATION/{application_id}' if application_id else '',
                      f'/KNOWLEDGE/{knowledge_id}' if knowledge_id else '']
        return "".join(components).lstrip('/')

    @classmethod
    def build(cls, group_enum_value, operation_enum_value, *, role_name=None, role_desc='',
              resource_type='WORKSPACE', resource_id=None):
        try:
            role_instance = cls[role_name]
        except KeyError:
            raise ValueError(f"Invalid role '{role_name}'. Valid choices: {cls.choices}")

        resource_path = cls.get_resource(**vars(role_instance))
        
        return Permission(
            group=group_enum_value,
            operate=operation_enum_value,
            resource_path=f"/{resource_type}/{resource_path}",
            role_list=[role_instance]
        )


class Permission:
    def __init__(self, group: Group, operate: Operate, resource_path=None, role_list=None):
        if role_list is None:
            role_list = []
        self.group = group
        self.operate = operate
        self.resource_path = resource_path
        self.role_list = role_list

    def __str__(self):
        components = [
            f"{self.group.value}",
            f":{self.operate.value}"
            ]
        if self.resource_path:
            components.append(f":{self.resource_path.strip('/')}")
            
        return "/".join(filter(None, components))

    def __eq__(self, other):
        if isinstance(other, Permission):
            return all(zip_longest(map(attrgetter('value'), self), map(attrgetter('value'), other)))
        return False


from collections import OrderedDict
from typing import Callable
@unique
class AuthorizationCodeEnum:
    
    VIEW_PERMISSION_CODE = ('view_permissions', PermissionConstants.VIEW.PERMISSION)
    EDIT_PERMISSION_CODE = ('edit_permissions', PermissionConstants.EDIT PERMISSION)
    DELETE_PERMISSION_CODE = ('delete_permissions', PermissionConstants.DELETE.PERMIS)


class Auth(object):
    def __init__(
        self,
        current_workspace: str | None,
        current_roles: list[Role] | None,
        permissions: list[Callable[..., Permission]],   # Function generating individual authorization objects
        keywords={}
    ):
        """
        Initialize an Authentication object.
        :param current_workspace: Id of the currently active workspace.
        :param current_roles: List of roles associated with the user account.
        :param permissions: Optional custom permissions or actions to add/override defaults.
        :param kwargs: Additional keyword args needed for permission creation (e.g., workspace_id).
        """

        self.current_workspace = current_workspace
        self.permissions = []

        # Generate default permissions if no provided
        default_perms = {
            'GET_USERS': PermissionConstants.GRANT_GET_USERS,
            'MODIFY_USERS': Permission Constants.MANUAL_MODIFICATION_OF_USER_LIST,
            'DELETE_USERS': PermissionConstants.DELETE_USER
        }
        self.add_default_permissions(default_perms)

        # Apply additional provided permissions
        for perm_generator in permissions:
            if callable(perm_generator):  # Check if function is provided
                new_perm = perm_generator(kwargs)
                if new_perm:
                    self.permissions.append(new_perm)

        # Append custom actions or operations here (not shown in sample)

This version refactors the original code into smaller, manageable parts while introducing better organization and consistency. It also includes some improvements to improve error reporting and make the code more flexible for future expansion.

update_time = models.DateTimeField(verbose_name="修改时间", auto_now=True)

class Meta:
db_table = "workspace_user_permission"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided Python class WorkspaceUserPermission appears to be correctly structured for managing permissions related to specific applications or knowledge bases within workspaces. Here are some general observations and suggestions:

  1. Database Table Name: The class uses db_table="workspace_user_permission" which is correct according to Django's naming convention for database tables.

  2. Fields:

    • id: Uses UUIDField with a length of 128 characters, generating UUID version 7 via uuid.uuid7. This is suitable if there needs to be strong uniqueness across the table.
    • workspace_id: A string field (max length 128) that stores the unique identifier for the workspace.
    • user: Links back to a User object via a foreign key relationship (on_delete=models.DO_NOTHING). It is advisable to specify this properly in terms of relationships between models.
    • auth_target_type: A text choice field defined in AuthTargetType, specifying whether it represents a knowledge base or application.
    • target: A UUID-based field storing either the knowledge base or application ID associated with each permission entry.
    • is_auth: A boolean indicating whether the current user has been granted access to the specified application/knowledge base.
    • create_time: Automatically tracks when the record was created.
    • update_time: Updates automatically whenever the record is modified.
  3. Optimization Suggestions:

    • Ensure proper relationships between models using primary keys. For example:
      from django.conf import settings
      User = settings.AUTH_USER_MODEL
      
      class WorkspaceUserPermission(models.Model):
          # ... existing fields ...
          def save(self, *args, **kwargs):    
              self.user = User.objects.get(username=self.created_by)
              super().save(*args, **kwargs)
      Or define a custom manager for related objects.
  4. Code Improvements:

    • Add any necessary documentation strings where missing.
    • Use type hinting for better code readability and maintainability.
    • Consider adding validation on field types as needed (e.g., ensuring workspace_id is non-empty).
  5. Future Extensions:

    • Implement additional checks to ensure data integrity and consistency based on business rules.
    • Enable automatic soft delete functionality using Django signals or the SoftDeletableModel mixin.

Overall, the class provides a solid framework for handling user authorization and can efficiently manage operations within a single workspace. If additional features like bulk updates or complex queries are needed, further refinement would be appropriate based on use cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant